Глава 3: Параллелизация
Обзор паттерна «Параллелизация»
В предыдущих главах мы рассмотрели «Сцепление промптов» (Prompt Chaining) для последовательных рабочих процессов и «Маршрутизацию» для динамического выбора и переходов между различными путями. Хотя эти паттерны важны, многие сложные агентные задачи состоят из нескольких подзадач, которые можно выполнять одновременно, а не строго друг за другом. Именно здесь становится критически важен паттерн «Параллелизация».
Параллелизация — это одновременное выполнение нескольких компонентов, таких как вызовы LLM, использование инструментов или даже целых подагентов (см. рис. 1). Вместо ожидания завершения одного шага перед началом следующего, параллельное выполнение позволяет независимым задачам запускаться в одно и то же время, что значительно сокращает общее время выполнения для задач, разложимых на независимые части.
Рассмотрим агента, предназначенного для исследования темы и суммирования найденного. Последовательный подход может выглядеть так:
- Найти источник A.
- Суммировать источник A.
- Найти источник B.
- Суммировать источник B.
- Синтезировать финальный ответ из суммаризаций A и B.
Параллельный подход вместо этого мог бы:
- Искать источник A и искать источник B одновременно.
- Как только оба поиска завершены, суммировать источник A и суммировать источник B одновременно.
- Синтезировать финальный ответ из суммаризаций A и B (обычно этот шаг последовательный и ожидает завершения параллельных шагов).
Ключевая идея — выявлять части рабочего процесса, не зависящие от результатов других частей, и выполнять их параллельно. Это особенно эффективно при работе с внешними сервисами (например, API или базами данных), где есть задержки: можно рассылать несколько запросов одновременно.
Реализация параллелизации часто требует фреймворков с поддержкой асинхронного исполнения или многопоточности/многопроцессности. Современные агентные фреймворки спроектированы с учётом асинхронных операций и позволяют легко определять шаги, которые могут выполняться параллельно.
Рис. 1. Пример параллелизации с подагентами
Такие фреймворки, как LangChain, LangGraph и Google ADK, предоставляют механизмы для параллельного выполнения. В LangChain Expression Language (LCEL) параллельное выполнение достигается за счёт комбинирования runnable‑объектов с помощью операторов вроде | (для последовательности) и структурирования цепочек или графов так, чтобы ветви выполнялись одновременно. LangGraph, благодаря графовой структуре, позволяет определять несколько узлов, запускаемых из одного состояния, что фактически включает параллельные ветви в рабочем процессе. Google ADK предоставляет встроенные механизмы для упрощения и управления параллельным исполнением агентов, что существенно повышает эффективность и масштабируемость сложных мультиагентных систем. Эта нативная возможность позволяет разработчикам проектировать решения, где несколько агентов работают одновременно, а не последовательно.
Паттерн «Параллелизация» жизненно важен для повышения эффективности и отзывчивости агентных систем, особенно при выполнении задач, включающих множество независимых поисков, вычислений или взаимодействий с внешними сервисами. Это ключевая техника оптимизации производительности сложных агентных рабочих процессов.
Практические применения и сценарии
Параллелизация — мощный паттерн оптимизации производительности агентов во множестве приложений:
Сбор информации и исследования.
- Пример: агент, исследующий компанию.
- Параллельные задачи: поиск новостей, получение биржевых данных, мониторинг упоминаний в соцсетях и запрос к внутренней базе компании — всё одновременно.
- Польза: формирование целостной картины быстрее, чем при последовательных обращениях.
Обработка данных и аналитика.
- Пример: агент, анализирующий отзывы клиентов.
- Параллельные задачи: сентимент‑анализ, извлечение ключевых слов, категоризация отзывов, выявление срочных проблем — одновременно по пакету данных.
- Польза: многогранная аналитика быстрее.
Взаимодействие с несколькими API или инструментами.
- Пример: агент планирования путешествий.
- Параллельные задачи: проверить цены на авиабилеты, наличие отелей, локальные события и рекомендации ресторанов — одновременно.
- Польза: быстрый и полный план поездки.
Генерация контента из нескольких компонентов.
- Пример: агент, создающий маркетинговое письмо.
- Параллельные задачи: сгенерировать тему, черновик письма, подобрать изображение и текст для кнопки CTA — одновременно.
- Польза: быстрее собрать финальный материал.
Валидация и проверка.
- Пример: агент проверяет пользовательский ввод.
- Параллельные задачи: проверить формат email, валидировать телефон, сверить адрес с базой, проверить на ненормативную лексику — одновременно.
- Польза: ускоренная обратная связь о корректности.
Мультимодальная обработка.
- Пример: агент анализирует пост в соцсетях с текстом и изображением.
- Параллельные задачи: анализ текста на сентимент и ключевые слова и параллельный анализ изображения на объекты и сцену.
- Польза: более быстрое объединение инсайтов из разных модальностей.
A/B‑тестирование или генерация нескольких вариантов.
- Пример: агент генерирует креативные текстовые варианты.
- Параллельные задачи: сгенерировать три разных заголовка статьи одновременно с немного различающимися промптами или моделями.
- Польза: быстрый сравнительный отбор лучшего варианта.
Параллелизация — базовая техника оптимизации в агентном дизайне: она позволяет создавать более производительные и отзывчивые приложения, используя конкурентное исполнение независимых задач.
Практический пример кода (LangChain)
Параллельное исполнение в LangChain осуществляется при помощи LangChain Expression Language (LCEL). Основной метод — структурировать несколько runnable‑компонентов в словарь или список. Когда эта коллекция подаётся на вход следующего компонента цепочки, рантайм LCEL исполняет содержащиеся runnables одновременно.
В контексте LangGraph этот принцип реализуется через топологию графа. Параллельные рабочие процессы определяются как множественные узлы без прямых последовательных зависимостей, запускаемые из одного общего узла. Эти параллельные ветви выполняются независимо, прежде чем их результаты будут агрегированы на последующем узле‑конвергенции.
Ниже показана реализация параллельного процесса на LangChain. Этот рабочий процесс запускает две независимые операции параллельно в ответ на единичный пользовательский запрос. Параллельные процессы оформлены как отдельные цепочки/функции, а их выходы затем агрегируются в единый результат.
Для запуска требуются Python‑пакеты langchain, langchain‑community и поставщик модели, например langchain‑openai. Кроме того, в локальной среде должен быть настроен корректный API‑ключ выбранной модели.
import os
import asyncio
from typing import Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import Runnable, RunnableParallel, RunnablePassthrough
# --- Configuration ---
# Ensure your API key environment variable is set (e.g., OPENAI_API_KEY)
try:
llm: Optional[ChatOpenAI] = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
except Exception as e:
print(f"Error initializing language model: {e}")
llm = None
# --- Define Independent Chains ---
# These three chains represent distinct tasks that can be executed in parallel.
summarize_chain: Runnable = (
ChatPromptTemplate.from_messages([
("system", "Summarize the following topic concisely:"),
("user", "{topic}")
])
| llm
| StrOutputParser()
)
questions_chain: Runnable = (
ChatPromptTemplate.from_messages([
("system", "Generate three interesting questions about the following topic:"),
("user", "{topic}")
])
| llm
| StrOutputParser()
)
terms_chain: Runnable = (
ChatPromptTemplate.from_messages([
("system", "Identify 5-10 key terms from the following topic, separated by commas:"),
("user", "{topic}")
])
| llm
| StrOutputParser()
)
# --- Build the Parallel + Synthesis Chain ---
# 1. Define the block of tasks to run in parallel. The results of these,
# along with the original topic, will be fed into the next step.
map_chain = RunnableParallel(
{
"summary": summarize_chain,
"questions": questions_chain,
"key_terms": terms_chain,
"topic": RunnablePassthrough(), # Pass the original topic through
}
)
# 2. Define the final synthesis prompt which will combine the parallel results.
synthesis_prompt = ChatPromptTemplate.from_messages([
("system", """Based on the following information:
Summary: {summary}
Related Questions: {questions}
Key Terms: {key_terms}
Synthesize a comprehensive answer."""),
("user", "Original topic: {topic}")
])
# 3. Construct the full chain by piping the parallel results directly
# into the synthesis prompt, followed by the LLM and output parser.
full_parallel_chain = map_chain | synthesis_prompt | llm | StrOutputParser()
# --- Run the Chain ---
async def run_parallel_example(topic: str) -> None:
"""
Asynchronously invokes the parallel processing chain with a specific topic
and prints the synthesized result.
Args:
topic: The input topic to be processed by the LangChain chains.
"""
if not llm:
print("LLM not initialized. Cannot run example.")
return
print(f"\n--- Running Parallel LangChain Example for Topic: '{topic}' ---")
try:
# The input to `ainvoke` is the single 'topic' string,
# then passed to each runnable in the `map_chain`.
response = await full_parallel_chain.ainvoke(topic)
print("\n--- Final Response ---")
print(response)
except Exception as e:
print(f"\nAn error occurred during chain execution: {e}")
if __name__ == "__main__":
test_topic = "The history of space exploration"
# In Python 3.7+, asyncio.run is the standard way to run an async function.
asyncio.run(run_parallel_example(test_topic))Приведённый Python‑код реализует приложение на LangChain для эффективной обработки темы за счёт параллельного исполнения. Обратите внимание, что asyncio обеспечивает конкуррентность, а не истинный параллелизм: на одном потоке переключение задач происходит во время ожиданий (например, сетевых запросов). В результате создаётся эффект одновременного прогресса нескольких задач, но исполнение кода продолжает ограничиваться одним потоком и GIL.
Код импортирует ключевые компоненты из langchain_openai и langchain_core: модели, промпты, парсеры и структуры runnable. Пытается инициализировать ChatOpenAI (модель «gpt‑4o‑mini») с заданной температурой; оборачивает инициализацию в try/except. Далее определяются три независимые «цепочки»: краткая суммаризация темы, генерация трёх вопросов по теме и извлечение 5–10 ключевых терминов (через соответствующие ChatPromptTemplate, модель и StrOutputParser).
Затем конструируется блок RunnableParallel, объединяющий три цепочки и пропускающий исходную тему через RunnablePassthrough. Определяется отдельный промпт синтеза, который принимает суммаризацию, вопросы, ключевые термины и оригинальную тему, чтобы сгенерировать итоговый ответ. Полная цепочка строится как последовательность: параллельный блок → промпт синтеза → модель → парсер строки. Асинхронная функция run_parallel_example демонстрирует вызов полной цепочки. В конце пример запускается для темы «The history of space exploration».
Практический пример кода (Google ADK)
Теперь посмотрим на конкретный пример этих идей в рамках Google ADK. Разберём, как примитивы ADK — такие как ParallelAgent и SequentialAgent — применяются для построения потока агента, использующего конкурентное исполнение для повышения эффективности.
from google.adk.agents import LlmAgent, ParallelAgent, SequentialAgent
from google.adk.tools import google_search
GEMINI_MODEL = "gemini-2.0-flash"
# --- 1. Define Researcher Sub-Agents (to run in parallel) ---
# Researcher 1: Renewable Energy
researcher_agent_1 = LlmAgent(
name="RenewableEnergyResearcher",
model=GEMINI_MODEL,
instruction="""You are an AI Research Assistant specializing in energy.
Research the latest advancements in 'renewable energy sources'.
Use the Google Search tool provided. Summarize your key findings
concisely (1-2 sentences). Output *only* the summary.""",
description="Researches renewable energy sources.",
tools=[google_search],
# Store result in state for the merger agent
output_key="renewable_energy_result"
)
# Researcher 2: Electric Vehicles
researcher_agent_2 = LlmAgent(
name="EVResearcher",
model=GEMINI_MODEL,
instruction="""You are an AI Research Assistant specializing in transportation.
Research the latest developments in 'electric vehicle technology'.
Use the Google Search tool provided. Summarize your key findings
concisely (1-2 sentences). Output *only* the summary.""",
description="Researches electric vehicle technology.",
tools=[google_search],
# Store result in state for the merger agent
output_key="ev_technology_result"
)
# Researcher 3: Carbon Capture
researcher_agent_3 = LlmAgent(
name="CarbonCaptureResearcher",
model=GEMINI_MODEL,
instruction="""You are an AI Research Assistant specializing in climate solutions.
Research the current state of 'carbon capture methods'.
Use the Google Search tool provided. Summarize your key findings
concisely (1-2 sentences). Output *only* the summary.""",
description="Researches carbon capture methods.",
tools=[google_search],
# Store result in state for the merger agent
output_key="carbon_capture_result"
)
# --- 2. Create the ParallelAgent (Runs researchers concurrently) ---
# This agent orchestrates the concurrent execution of the researchers.
# It finishes once all researchers have completed and stored their results in state.
parallel_research_agent = ParallelAgent(
name="ParallelWebResearchAgent",
sub_agents=[researcher_agent_1, researcher_agent_2, researcher_agent_3],
description="Runs multiple research agents in parallel to gather information."
)
# --- 3. Define the Merger Agent (Runs *after* the parallel agents) ---
# This agent takes the results stored in the session state by the parallel agents
# and synthesizes them into a single, structured response with attributions.
merger_agent = LlmAgent(
name="SynthesisAgent",
model=GEMINI_MODEL, # Or potentially a more powerful model if needed for synthesis
instruction="""You are an AI Assistant responsible for combining research findings
into a structured report. Your primary task is to synthesize the following research
summaries, clearly attributing findings to their source areas. Structure your response
using headings for each topic. Ensure the report is coherent and integrates the key
points smoothly.
**Crucially: Your entire response MUST be grounded *exclusively* on the information
provided in the 'Input Summaries' below. Do NOT add any external knowledge, facts,
or details not present in these specific summaries.**
**Input Summaries:**
* **Renewable Energy:**
{renewable_energy_result}
* **Electric Vehicles:**
{ev_technology_result}
* **Carbon Capture:**
{carbon_capture_result}
**Output Format:**
## Summary of Recent Sustainable Technology Advancements
### Renewable Energy Findings (Based on RenewableEnergyResearcher's findings)
[Synthesize and elaborate *only* on the renewable energy input summary provided above.]
### Electric Vehicle Findings (Based on EVResearcher's findings)
[Synthesize and elaborate *only* on the EV input summary provided above.]
### Carbon Capture Findings (Based on CarbonCaptureResearcher's findings)
[Synthesize and elaborate *only* on the carbon capture input summary provided above.]
### Overall Conclusion
[Provide a brief (1-2 sentence) concluding statement that connects *only* the
findings presented above.]
Output *only* the structured report following this format. Do not include
introductory or concluding phrases outside this structure, and strictly
adhere to using only the provided input summary content.""",
description="Combines research findings from parallel agents into a structured, "
"cited report, strictly grounded on provided inputs.",
# No tools needed for merging
# No output_key needed here, as its direct response is the final output of the sequence
)
# --- 4. Create the SequentialAgent (Orchestrates the overall flow) ---
# This is the main agent that will be run. It first executes the ParallelAgent
# to populate the state, and then executes the MergerAgent to produce the final output.
sequential_pipeline_agent = SequentialAgent(
name="ResearchAndSynthesisPipeline",
# Run parallel research first, then merge
sub_agents=[parallel_research_agent, merger_agent],
description="Coordinates parallel research and synthesizes the results."
)
root_agent = sequential_pipeline_agentЭтот код определяет мультиагентную систему для исследования и синтеза сведений по устойчивым технологиям. Создаются три LlmAgent‑исследователя: по возобновляемой энергетике, электротранспорту и методам улавливания CO₂. Каждый использует инструмент google_search, суммирует итог в 1–2 предложениях и сохраняет результат в состояние с ключом.
Затем создаётся ParallelAgent, который запускает трёх исследователей параллельно и завершается после того, как они сохранят данные в состоянии. Далее определяется MergerAgent (тоже LlmAgent) — он берёт суммаризации из состояния и синтезирует структурированный отчёт. Инструкция подчёркивает строгое заземление на входных суммаризациях без добавления внешних фактов. Наконец, SequentialAgent оркестрирует общий поток: сначала выполняется параллельный сбор, затем — синтез отчёта.
Коротко
Что: Во многих агентных рабочих процессах есть несколько подзадач, которые нужно выполнить для достижения итоговой цели. Чисто последовательное выполнение, при котором каждый шаг ждёт завершения предыдущего, часто неэффективно и медленно. Особенно это тормозит, когда задачи зависят от внешнего I/O (API, БД): без конкуррентного исполнения суммарное время становится суммой длительностей всех шагов.
Почему: Паттерн «Параллелизация» даёт стандартный способ одновременно выполнять независимые задачи. Он выявляет компоненты рабочего процесса (вызовы инструментов или LLM), результаты которых не зависят друг от друга. Фреймворки вроде LangChain и Google ADK предоставляют встроенные конструкции для объявления и управления такими параллельными операциями. Запуская независимые задачи одновременно, а не последовательно, этот паттерн существенно снижает общую задержку.
Практическое правило: Используйте параллелизацию, когда в рабочем процессе есть несколько независимых операций, которые можно запускать одновременно, например: запросы к нескольким API, обработка разных фрагментов данных или генерация нескольких компонентов контента для последующего синтеза.

Рис. 2. Паттерн «Параллелизация»
Ключевые выводы
- Параллелизация — это одновременное выполнение независимых задач для повышения эффективности.
- Особенно полезна, когда задачи ждут внешние ресурсы (API‑вызовы, I/O).
- Переход к конкурентной/параллельной архитектуре повышает сложность и стоимость (дизайн, отладка, логирование).
- Фреймворки LangChain и Google ADK поддерживают определение и управление параллельным исполнением.
- В LCEL ключевая конструкция для параллели — RunnableParallel.
- В Google ADK параллель можно реализовать через LLM‑управляемую делегацию: координатор выявляет независимые подзадачи и запускает их специализированными подагентами.
- Параллелизация снижает суммарную задержку и делает агентные системы более отзывчивыми при сложных задачах.
Заключение
Паттерн параллелизации — способ оптимизации вычислительных рабочих процессов за счёт одновременного выполнения независимых подзадач. Такой подход снижает суммарную задержку, особенно в сценариях с несколькими инференсами моделей или вызовами внешних сервисов.
Фреймворки предоставляют разные механизмы реализации. В LangChain конструкция RunnableParallel позволяет явно определить и выполнить несколько цепочек одновременно. В Google Agent Developer Kit (ADK) параллель достигается через мультиагентную делегацию: координатор назначает подзадачи специализированным агентам, которые могут работать параллельно.
Комбинируя параллельную обработку с последовательными (chaining) и условными (routing) потоками, можно строить сложные и высокопроизводительные вычислительные системы, эффективно справляющиеся с разнообразными и комплексными задачами.
Ссылки
- Документация LCEL (параллелизм)
- Документация Google ADK (мультиагентные системы)
- Документация Python asyncio
Навигация
Назад: Глава 2. Маршрутизация
Вперед: Глава 4. Рефлексия